package server

import (
	"context"
	"fmt"
	"github.com/IBM/sarama"
	"kratos_kafka/internal/conf"
	"kratos_kafka/internal/service"
	"os"
	"os/signal"
	"time"
)

type RealConsumer struct {
	// 将GreeterService放进去方便业务处理～
	greeterService *service.GreeterService
}

func NewRealConsumer(greeterService *service.GreeterService) *RealConsumer {
	return &RealConsumer{
		greeterService: greeterService,
	}
}

// Notice 详细的使用参考 https://juejin.cn/post/6999263126713696293#heading-4
func (c *RealConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	// 轮询消息...
	for message := range claim.Messages() {
		fmt.Printf("real_topic收到了消息消息>>>: partition: %v, offset: %v, value: %v, mgsTimestamp: %v, \n", message.Partition, message.Offset, string(message.Value), message.Timestamp)
		// TODO 需要业务处理的话 先json解析一下.....

		// Notice MarkMessage！
		session.MarkMessage(message, "")
	}
	return nil
}

func (c *RealConsumer) Setup(sarama.ConsumerGroupSession) error {
	return nil
}

func (c *RealConsumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// 正常的 sarama-kafka consumers
func NewSaramaKafkaRealConsumerServer(bootConf *conf.Bootstrap, greeterService *service.GreeterService) *sarama.ConsumerGroup {

	ctx, cancel := context.WithCancel(context.Background())

	// Notice 如何使用TLS连接kafka，参考 tests/sarama_kafka_tls 中的代码～

	// NewConfig的时候初始化了各种配置～
	realConsumerConfig := sarama.NewConfig()
	// Notice 消费者配置 用源码包一定要看一下它的配置
	realConsumerConfig.Consumer.Return.Errors = true // 改成true
	// 未找到组消费位移的时候从哪边开始消费 TODO 默认是 OffsetNewest 从最新的偏移量开始消费
	realConsumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
	//realConsumerConfig.Consumer.Offsets.Initial = sarama.OffsetOldest # 从最早的偏移量开始消费
	realConsumerConfig.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
	realConsumerConfig.Consumer.Offsets.Retry.Max = 3

	// Notice 下面的配置跟默认配置一样!!!
	realConsumerConfig.Consumer.Fetch.Min = 1
	realConsumerConfig.Consumer.Fetch.Default = 1024 * 1024
	realConsumerConfig.Consumer.Retry.Backoff = 2 * time.Second
	realConsumerConfig.Consumer.MaxWaitTime = 250 * time.Millisecond
	realConsumerConfig.Consumer.MaxProcessingTime = 100 * time.Millisecond

	// Notice 自动提交模式下已经能解决丢消息问题,没有特殊需求不需要手动提交
	// 手动提交消息参考下面的文章: https://juejin.cn/post/6999263126713696293#heading-5
	realConsumerConfig.Consumer.Offsets.AutoCommit.Enable = true

	// Notice new consumer group
	realConsumerGroup, errRealConsumerGroup := sarama.NewConsumerGroup(
		// addr
		bootConf.DelayQueue.SaramaKafka.Server.Addr,
		// group
		bootConf.DelayQueue.SaramaKafka.RealGroup,
		realConsumerConfig,
	)
	if errRealConsumerGroup != nil {
		// 直接panic
		panic(errRealConsumerGroup)
	}

	// 监听信号，用于优雅地停止消费者 Notice 防止 goroutine泄漏！
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt, os.Kill)
	// 使用自己定义的 RealConsumer 消费消息 重写 ConsumeClaim 方法处理消费的数据
	// 将greeterService当作参数传进去～
	currRealConsumer := NewRealConsumer(greeterService)
	go func() {
		// Notice 死循环...
		for {
			select {
			case err := <-realConsumerGroup.Errors():
				if err != nil {
					fmt.Println("realConsumerGroup.Errors():", err)
				}
			// TODO ... 最好写一个 clean函数 将取消信号加进去～～
			case <-signals:
				// 但是实际上 如果先启动项目然后把项目停止，接着用tests/sarama_partitions中的脚本往相同的topic发消息，然后再启动项目还是能消费到消息
				// 说明项目停止后这个消费的子协程也停掉不会再继续消费消息了
				printStopSignal()
				//fmt.Println("收到了停止的信号!!! 停止realConsumer...")
				// Notice Close
				realConsumerGroup.Close()
				cancel()
				return
			default:
				errConsume := realConsumerGroup.Consume(
					ctx,
					// topic
					[]string{bootConf.DelayQueue.SaramaKafka.RealTopic},
					// Notice 重写了ConsumeClaim方法，业务中主要的处理逻辑写在这个方法里面
					currRealConsumer,
				)
				if errConsume != nil {
					fmt.Println("realConsumerGroup消费消息发生错误! err: ", errConsume)
				}
			}
		}
	}()

	return &realConsumerGroup
}

func printMsgTest(msg string) {

	fmt.Println("printMsgTest..............", msg)

}

func printStopSignal() {
	fmt.Println("收到了停止的信号!!! 停止realConsumer...")
}
